package com.imooc.spark

import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by zghgchao 2017/11/17 9:34
  * Spark Streaming对接Kafka的方式-- Receiver-based Approach
  */
object KafkaDirectWordCount {
  def main(args: Array[String]): Unit = {

    if (args.length != 2) {
      System.err.println("Usage：KafkaDirectWordCount <brokers> <topics> ")
      System.exit(1)
    }

    val Array(brokers, topics) = args

    val sparkConf = new SparkConf()//.setAppName("KafkaDirectWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    val kafkaParams = Map[String,String]("metadata.broker.list" -> brokers)

    val topicSet = topics.split(",").toSet

    //TODO... Direct-based Approach
//    val messages = KafkaUtils.createDirectStream[String,String, StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)
    val messages = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](
          ssc,kafkaParams,topicSet
          )

    messages.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()

    ssc.start()
    ssc.awaitTermination()
  }
}
